package com.ruyuan.eshop.push.mq.consumer.listener;

import com.alibaba.fastjson.JSON;
import com.ruyuan.eshop.common.concurrent.NamedDaemonThreadFactory;
import com.ruyuan.eshop.common.message.PlatformMessagePushMessage;
import com.ruyuan.eshop.common.message.PlatformPromotionMessage;
import com.ruyuan.eshop.push.domain.dto.MessageSendDTO;
import com.ruyuan.eshop.push.service.MessageSendService;
import com.ruyuan.eshop.push.service.MessageSendServiceFactory;
import com.ruyuan.eshop.push.service.impl.FactoryProducer;
import com.sun.org.apache.bcel.internal.generic.FADD;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;

import javax.print.DocFlavor;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import java.util.stream.Collectors;

/**
 * @author zhonghuashishan
 */
@Slf4j
@Component
public class PlatFormPromotionListener implements MessageListenerConcurrently {

    /**
     * 消息推送工厂提供者
     */
    @Autowired
    private FactoryProducer factoryProducer;

    @Autowired
    private RedisTemplate<String, String> redisTemplate;

    private static final int PERMITS = 30;
    private static final AtomicBoolean initializedRef = new AtomicBoolean(false);
    private static ThreadPoolExecutor THREAD_POOL_EXECUTOR = null;

    private static final Supplier<ThreadPoolExecutor> THREAD_POOL_EXECUTOR_SUPPLIER = () -> {
        if (initializedRef.compareAndSet(false, true)) {
            // corePoolSize是30个，maxPoolSize是60个
            THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(
                    PERMITS, PERMITS * 2,
                    60,
                    TimeUnit.SECONDS,
                    new ArrayBlockingQueue<>(1000),
                    NamedDaemonThreadFactory.getInstance("consumePromotionMsg"),
                    new ThreadPoolExecutor.CallerRunsPolicy());

        }
        return THREAD_POOL_EXECUTOR;
    };

    /**
     * 并发消费消息
     * @param msgList
     * @param context
     * @return
     */
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgList, ConsumeConcurrentlyContext context) {
        try {
            // 部署5台机器，每台机器会拿到200w条消息，一批一批拿，msg list
            // 每条消息sdk推送push，100ms~200ms之间，第三方平台，200w * 200ms = 4 0000 0000ms = 40 0000s = 三四千分钟 = 几十个小时
            // 必须得线程池多线程狂推
            // 最多是60个线程，300个线程，每个线程每秒钟可以推成5次，1500次，15w次，150w次，5000~6000s，1000w次
            // 6000 / 60 = 100分钟，1个多小时（千万级用户全量推送，快则几十分钟，满则两三个小时）

            // 使用自定义的业务线程池
            List<CompletableFuture<AltResult>> futureList = msgList.stream()
                     .map(e -> CompletableFuture.supplyAsync(() -> handleMessageExt(e), THREAD_POOL_EXECUTOR_SUPPLIER.get()))
                     .collect(Collectors.toList());

            List<Throwable> resultList = futureList.stream()
                    .map(CompletableFuture::join)
                    .filter(e -> e.ex != null)
                    .map(e -> e.ex)
                    .collect(Collectors.toList());

            if (!resultList.isEmpty()) {
                throw resultList.get(0);
            }
        }catch (Throwable e){
            log.error("consume error,平台优惠券消费失败", e);
            // 本次消费失败，下次重新消费
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }

        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }

    private AltResult handleMessageExt(MessageExt messageExt) {
        try {
            log.debug("执行平台发送通知消息逻辑，消息内容：{}", messageExt.getBody());
            String msg = new String(messageExt.getBody());
            PlatformPromotionMessage message = JSON.parseObject(msg , PlatformPromotionMessage.class);

            // 幂等控制
            if (StringUtils.isNotBlank(redisTemplate.opsForValue().get(message.cacheKey()))) {
                return new AltResult(null);
            }

            // 获取消息服务工厂
            MessageSendServiceFactory messageSendServiceFactory = factoryProducer
                    .getMessageSendServiceFactory(message.getInformType());

            // 消息发送服务组件
            MessageSendService messageSendService = messageSendServiceFactory
                    .createMessageSendService();

            // 构造消息
            PlatformMessagePushMessage messagePushMessage = PlatformMessagePushMessage.builder()
                    .informType(message.getInformType())
                    .mainMessage(message.getMainMessage())
                    .userAccountId(message.getUserAccountId())
                    .message(message.getMessage())
                    .build();

            MessageSendDTO messageSendDTO = messageSendServiceFactory.createMessageSendDTO(messagePushMessage);

            messageSendService.send(messageSendDTO);

            // 发送成功之后把已经发送成功记录到redis
            redisTemplate.opsForValue().set(message.cacheKey(), UUID.randomUUID().toString());

            log.info("消息推送完成，messageSendDTO:{}", messageSendDTO);
            Thread.sleep(200);
            return new AltResult(null);
        } catch (Exception e) {
            return new AltResult(e);
        }
    }

    /**
     * completableFuture的返回结果，适用于无返回值的情况
     * ex字段为null表示任务执行成功
     * ex字段不为null表示任务执行失败，并把异常设置为ex字段
     */
    private static class AltResult {

        final Throwable ex;

        public AltResult(Throwable ex) {
            this.ex = ex;
        }
    }

}
